Print the max element of RDD.
val seq = Seq(3,9,2,3,5,4)
val rdd = sc.parallelize(seq,2)
rdd.takeOrdered(1)(Ordering[Int].reverse)
case class Person(name:String, age:Int)
val people = Array(Person("bob", 30), Person("ann", 32), Person("carl", 19))
val rdd = sc.parallelize(people,2)
//rdd.takeOrdered(1)(Ordering[Int].reverse.on(x=>x.age))
rdd.takeOrdered(1)(Ordering[Int].reverse.on(_.age))
val rdd1 = sc.parallelize(List(("Hadoop PIG Hive"), ("Hive PIG PIG Hadoop"), ("Hadoop Hadoop Hadoop")))
val rdd2 = rdd1.flatMap(x => x.split(" ")).map(x => (x,1))
val rdd3 = rdd2.reduceByKey((x,y) => (x+y))
rdd3.takeOrdered(3)(Ordering[Int].reverse.on(x=>x._2))
rdd3.takeOrdered(3)(Ordering[Int].on(x=>x._2))
Find the max value with associated key?
val myRDD = sc.parallelize(Array(("a",1),("b",5),("c",1),("d",3))).sortBy(_._2,false).take(1)
How to calculate sum and count in a single groupBy?
val datasets = sc.parallelize(List(("A","HYD",10),("B","BLR",30),("A","HYD",40),("B","BLR",50),("C","DEL",60)))
val df1=datasets.toDF("id","Loc","Amt")
import org.apache.spark.sql.functions._
df1.groupBy($"id").agg(sum($"Amt"),max("Amt")).show()
Find the max value for each group
val rdd5=sc.parallelize(List(("v",3),("v", 1),("v", 1),("w", 7),("w", 1),("x", 3),("y", 1),("y", 1),("y", 2),("y", 3)))
rdd5.reduceByKey(math.max(_, _)).collect
val rdd5=sc.parallelize(List(("v",3),("v", 1),("v", 1),("w", 7),("w", 1),("x", 3),("y", 1),("y", 1),("y", 2),("y", 3)))
rdd5.groupByKey().map(x => (x._1,x._2.max)).collect
Find the sum of value of tuple.
val x = List(("X45", 2), ("W80", 1), ("F03", 2), ("X61", 2))
val rdd = sc.parallelize(x)
rdd.map(x => x._2).collect
rdd.map(x => x._2).sum
rdd.map(_._2).sum
val rdd2 = sc.parallelize(1 to 20)
rdd2.sum
rdd2.max
rdd2.min
val datasets = sc.parallelize(List(("A","HYD",10),("B","BLR",30),("A","HYD",40),("B","BLR",50),("C","DEL",60)))
val df1=datasets.toDF("id","Loc","Amt")
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
df1.groupBy("id","Loc").agg(sum("Amt")).show
Find avaerage
val rdd = sc.parallelize(Seq((2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)))
val agg_rdd = rdd.aggregateByKey((0,0))((x,y) => (x._1 + y, x._2 + 1),(x1, y1) => (x1._1 + y1._1, x1._2 + y1._2))
val sum = agg_rdd.mapValues(x => (x._1/x._2))
sum.collect
val data = Array(("9888wq",(1,2)),("abcd",(1,1)),("abcd",(3,2)),("9888wq",(4,2)))
val rdd= sc.parallelize(data)
val result = rdd.map(x => (x._1,(x._2._1+x._2._2))).reduceByKey((x,y) => x+y)
result.foreach(println)
Input
Output
val seq = Seq(3,9,2,3,5,4)
val rdd = sc.parallelize(seq,2)
rdd.takeOrdered(1)(Ordering[Int].reverse)
case class Person(name:String, age:Int)
val people = Array(Person("bob", 30), Person("ann", 32), Person("carl", 19))
val rdd = sc.parallelize(people,2)
//rdd.takeOrdered(1)(Ordering[Int].reverse.on(x=>x.age))
rdd.takeOrdered(1)(Ordering[Int].reverse.on(_.age))
val rdd1 = sc.parallelize(List(("Hadoop PIG Hive"), ("Hive PIG PIG Hadoop"), ("Hadoop Hadoop Hadoop")))
val rdd2 = rdd1.flatMap(x => x.split(" ")).map(x => (x,1))
val rdd3 = rdd2.reduceByKey((x,y) => (x+y))
rdd3.takeOrdered(3)(Ordering[Int].reverse.on(x=>x._2))
rdd3.takeOrdered(3)(Ordering[Int].on(x=>x._2))
Find the max value with associated key?
val myRDD = sc.parallelize(Array(("a",1),("b",5),("c",1),("d",3))).sortBy(_._2,false).take(1)
How to calculate sum and count in a single groupBy?
val datasets = sc.parallelize(List(("A","HYD",10),("B","BLR",30),("A","HYD",40),("B","BLR",50),("C","DEL",60)))
val df1=datasets.toDF("id","Loc","Amt")
import org.apache.spark.sql.functions._
df1.groupBy($"id").agg(sum($"Amt"),max("Amt")).show()
Find the max value for each group
val rdd5=sc.parallelize(List(("v",3),("v", 1),("v", 1),("w", 7),("w", 1),("x", 3),("y", 1),("y", 1),("y", 2),("y", 3)))
rdd5.reduceByKey(math.max(_, _)).collect
val rdd5=sc.parallelize(List(("v",3),("v", 1),("v", 1),("w", 7),("w", 1),("x", 3),("y", 1),("y", 1),("y", 2),("y", 3)))
rdd5.groupByKey().map(x => (x._1,x._2.max)).collect
Find the sum of value of tuple.
val x = List(("X45", 2), ("W80", 1), ("F03", 2), ("X61", 2))
val rdd = sc.parallelize(x)
rdd.map(x => x._2).collect
rdd.map(x => x._2).sum
rdd.map(_._2).sum
val rdd2 = sc.parallelize(1 to 20)
rdd2.sum
rdd2.max
rdd2.min
val datasets = sc.parallelize(List(("A","HYD",10),("B","BLR",30),("A","HYD",40),("B","BLR",50),("C","DEL",60)))
val df1=datasets.toDF("id","Loc","Amt")
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
df1.groupBy("id","Loc").agg(sum("Amt")).show
Find avaerage
val rdd = sc.parallelize(Seq((2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)))
val agg_rdd = rdd.aggregateByKey((0,0))((x,y) => (x._1 + y, x._2 + 1),(x1, y1) => (x1._1 + y1._1, x1._2 + y1._2))
val sum = agg_rdd.mapValues(x => (x._1/x._2))
sum.collect
val data = Array(("9888wq",(1,2)),("abcd",(1,1)),("abcd",(3,2)),("9888wq",(4,2)))
val rdd= sc.parallelize(data)
val result = rdd.map(x => (x._1,(x._2._1+x._2._2))).reduceByKey((x,y) => x+y)
result.foreach(println)
Input
key1 value1
key1 value2
key2 value3
key3 value4
key3 value5
Output
key1 value1
key2 value3
key3 value4
I would like to get all the unique values among all the Array elements of this RDD I don't care about the key, just want to get all the unique values. So the result from the above sample is (1,2,3,4,5,20,30,50,400).
100, Array(1,2,3,4,5)
200,Array(1,2,50,20)
300, Array(30,2,400,1)
val result = rdd.flatMap(_._2).distinct
val rdd = sc.parallelize(List(("something1@domainA.com"),
("something2@domainA.com"),
("something3@domainB.com")))
rdd.map(_.split("@")).flatMap { case Array(_, d) => d.split("\\.").headOption }.distinct.collect
val df = Seq(
(1, 20, 21),
(2, 23, 22),
(1, 26, 23),
(2, 29, 24)
).toDF("Gender", "Age", "Value")
scala> df.show
+------+---+-----+
|Gender|Age|Value|
+------+---+-----+
| 1| 20| 21|
| 2| 23| 22|
| 1| 26| 23|
| 2| 29| 24|
+------+---+-----+
// Gender 1 = Male
// Gender 2 = Female
import org.apache.spark.sql.expressions.Window
val byGender = Window.partitionBy("gender").orderBy("gender")
val males = df
.filter("gender = 1")
.select($"age" as "male_age",
$"value" as "male_value",
row_number() over byGender as "RN")
scala> males.show
+--------+----------+---+
|male_age|male_value| RN|
+--------+----------+---+
| 20| 21| 1|
| 26| 23| 2|
+--------+----------+---+
val females = df
.filter("gender = 2")
.select($"age" as "female_age",
$"value" as "female_value",
row_number() over byGender as "RN")
scala> females.show
+----------+------------+---+
|female_age|female_value| RN|
+----------+------------+---+
| 23| 22| 1|
| 29| 24| 2|
+----------+------------+---+
scala> males.join(females, Seq("RN"), "outer").show
+---+--------+----------+----------+------------+
| RN|male_age|male_value|female_age|female_value|
+---+--------+----------+----------+------------+
| 1| 20| 21| 23| 22|
| 2| 26| 23| 29| 24|
No comments:
Post a Comment